home *** CD-ROM | disk | FTP | other *** search
/ Mac Easy 2010 May / Mac Life Ubuntu.iso / casper / filesystem.squashfs / usr / share / checkbox / plugins / backend_manager.pyc (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2009-10-12  |  6.3 KB  |  150 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. import logging
  5. import gobject
  6. import dbus
  7. import dbus.service as dbus
  8. import dbus.mainloop.glib as dbus
  9. from checkbox.lib.environ import append_path
  10. from checkbox.properties import Int, String
  11.  
  12. class PermissionDeniedByPolicy(dbus.DBusException):
  13.     _dbus_error_name = 'com.ubuntu.checkbox.PermissionDeniedByPolicy'
  14.  
  15.  
  16. class UnknownRegistryException(dbus.DBusException):
  17.     _dbus_error_name = 'com.ubuntu.DeviceDriver.InvalidDriverDBException'
  18.  
  19.  
  20. class UnknownTestException(dbus.DBusException):
  21.     _dbus_error_name = 'com.ubuntu.DeviceDriver.InvalidDriverDBException'
  22.  
  23.  
  24. class BackendManager(dbus.service.Object):
  25.     DBUS_INTERFACE_NAME = 'com.ubuntu.checkbox'
  26.     DBUS_BUS_NAME = 'com.ubuntu.checkbox'
  27.     timeout = Int(default = 600)
  28.     path = String(default = '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin')
  29.     
  30.     def __init__(self):
  31.         self.dbus_info = None
  32.         self.polkit = None
  33.         self.loop = False
  34.         self.tests = { }
  35.  
  36.     
  37.     def __repr__(self):
  38.         return '<BackendManager>'
  39.  
  40.     
  41.     def register(self, manager):
  42.         import dbus.mainloop.glib as dbus
  43.         dbus.mainloop.glib.DBusGMainLoop(set_as_default = True)
  44.         self.bus = dbus.SystemBus()
  45.         self.dbus_name = dbus.service.BusName(self.DBUS_BUS_NAME, self.bus)
  46.         for path in self.path.split(':'):
  47.             append_path(path)
  48.         
  49.         self._manager = manager
  50.         for rt, rh in [
  51.             ('test-.*', self.test_all),
  52.             ('run', self.run)]:
  53.             self._manager.reactor.call_on(rt, rh)
  54.         
  55.  
  56.     
  57.     def get_registry(self, name, sender = None, conn = None):
  58.         self._check_polkit_privilege(sender, conn, 'com.ubuntu.checkbox.info')
  59.         if name not in self._manager.registry:
  60.             raise UnknownRegistryException, 'Registry not found: %s' % name
  61.         name not in self._manager.registry
  62.         return str(self._manager.registry[name])
  63.  
  64.     get_registry = dbus.service.method(DBUS_INTERFACE_NAME, in_signature = 's', out_signature = 's', sender_keyword = 'sender', connection_keyword = 'conn')(get_registry)
  65.     
  66.     def get_test_result(self, suite, name, sender = None, conn = None):
  67.         self._check_polkit_privilege(sender, conn, 'com.ubuntu.checkbox.test')
  68.         if (suite, name) not in self.tests:
  69.             raise UnknownTestException, 'Suite/test not found: %s/%s' % (suite, name)
  70.         (suite, name) not in self.tests
  71.         test = self.tests[(suite, name)]
  72.         result = test.command()
  73.         return (result.status, result.data, str(result.duration))
  74.  
  75.     get_test_result = dbus.service.method(DBUS_INTERFACE_NAME, in_signature = 'ss', out_signature = 'as', sender_keyword = 'sender', connection_keyword = 'conn')(get_test_result)
  76.     
  77.     def get_test_description(self, suite, name, sender = None, conn = None):
  78.         self._check_polkit_privilege(sender, conn, 'com.ubuntu.checkbox.test')
  79.         if (suite, name) not in self.tests:
  80.             raise UnknownTestException, 'Suite/test not found: %s/%s' % (suite, name)
  81.         (suite, name) not in self.tests
  82.         test = self.tests[(suite, name)]
  83.         return test.description()
  84.  
  85.     get_test_description = dbus.service.method(DBUS_INTERFACE_NAME, in_signature = 'ss', out_signature = 's', sender_keyword = 'sender', connection_keyword = 'conn')(get_test_description)
  86.     
  87.     def test_all(self, test):
  88.         self.tests[(test.suite, test.name)] = test
  89.  
  90.     
  91.     def run(self):
  92.         self._manager.reactor.fire('gather')
  93.         dbus.service.Object.__init__(self, self.bus, '/checkbox')
  94.         main_loop = gobject.MainLoop()
  95.         self.loop = False
  96.         if self.timeout:
  97.             
  98.             def _t():
  99.                 main_loop.quit()
  100.                 return True
  101.  
  102.             gobject.timeout_add(self.timeout * 1000, _t)
  103.         
  104.         while not self.loop:
  105.             if self.timeout:
  106.                 self.loop = True
  107.             
  108.             main_loop.run()
  109.  
  110.     
  111.     def _check_polkit_privilege(self, sender, conn, privilege):
  112.         '''Verify that sender has a given PolicyKit privilege.
  113.  
  114.         sender is the sender\'s (private) D-BUS name, such as ":1:42"
  115.         (sender_keyword in @dbus.service.methods). conn is
  116.         the dbus.Connection object (connection_keyword in
  117.         @dbus.service.methods). privilege is the PolicyKit privilege string.
  118.  
  119.         This method returns if the caller is privileged, and otherwise throws a
  120.         PermissionDeniedByPolicy exception.
  121.         '''
  122.         if sender is None and conn is None:
  123.             return None
  124.         if self.dbus_info is None:
  125.             self.dbus_info = dbus.Interface(conn.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus/Bus', False), 'org.freedesktop.DBus')
  126.         
  127.         pid = self.dbus_info.GetConnectionUnixProcessID(sender)
  128.         if self.polkit is None:
  129.             self.polkit = dbus.Interface(dbus.SystemBus().get_object('org.freedesktop.PolicyKit', '/', False), 'org.freedesktop.PolicyKit')
  130.         
  131.         
  132.         try:
  133.             res = self.polkit.IsProcessAuthorized(privilege, pid, True)
  134.         except dbus.DBusException:
  135.             e = None
  136.             if e._dbus_error_name == 'org.freedesktop.DBus.Error.ServiceUnknown':
  137.                 self.polkit = None
  138.                 return self._check_polkit_privilege(sender, conn, privilege)
  139.             raise 
  140.         except:
  141.             e._dbus_error_name == 'org.freedesktop.DBus.Error.ServiceUnknown'
  142.  
  143.         if res != 'yes':
  144.             logging.debug('_check_polkit_privilege: sender %s on connection %s pid %i requested %s: %s', sender, conn, pid, privilege, res)
  145.             raise PermissionDeniedByPolicy(privilege + ' ' + res)
  146.         res != 'yes'
  147.  
  148.  
  149. factory = BackendManager
  150.